
This notebook explores the use of ODC with Dask LocalCluster. The goal is to introduce fundamental concepts and the role Dask can serve with datacube and subsequent computation using xarray.
The example computation is fairly typical of an EO data processing pipeline. We'll be using a small area and time period to start with and progressively scaling this example. EO scientists may find some aspects of these examples unrealistic, but this isn't an EO science course. :-).
For the base example we'll be using the Australian island state of Tasmania as our Region of Interest (ROI). Intially a paddock size, and progressively increasing to the entire island. The basic algorithm is:
datacube querydatacube.load()Some cells in this notebook will take minutes to run so be patient
# EASI tools
import git
import sys, os
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "chile"
import datacube
from datacube.utils import masking
The next cell sets out all the query parameters used in our datacube.load().
For this run we keep the ROI quite small.
# Get the default latitude & longitude extents
study_area_lat = easi.latitude
study_area_lon = easi.longitude
# Or choose your own by uncommenting and modifying this section
###############################################################
# # Central Tasmania (near Little Pine Lagoon)
# central_lat = -42.019
# central_lon = 146.615
# # Set the buffer to load around the central coordinates
# # This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
# buffer = 0.05
# # Compute the bounding box for the study area
# study_area_lat = (central_lat - buffer, central_lat + buffer)
# study_area_lon = (central_lon - buffer, central_lon + buffer)
###############################################################
# Data product
product = easi.product('landsat')
# product = 'landsat8_c2l2_sr'
# Set the date range to load data over
set_time = easi.time
# set_time = ("2021-01-01", "2021-01-31")
# Set the measurements/bands to load. None will load all of them
measurements = None
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
group_by = "solar_day"
Now initialise the datacube.
dc = datacube.Datacube()
# Access AWS "requester-pays" buckets
# This is necessary for reading data from most third-party AWS S3 buckets such as for Landsat and Sentinel-2
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True);
Now load the data. This first dc.load() does not use Dask, so it will take a little bit of time.
We use %%time to keep track of how long things take to complete.
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
group_by=group_by,
)
CPU times: user 2.2 s, sys: 4.78 s, total: 6.98 s Wall time: 20.1 s
The result of the datacube.load() function is an xarray.Dataset. The notebook can be used to render a description of the dataset variable as an html block with a lot of useful information about the structure of data.
If you open up the Data variables (click the > Data variables) and click on the stacked cylinders for one of them you will see the actual data array is available and shown in summary form.
NOTE that you can see real numbers in the array when you do this. This will change when we start using Dask.
This visualisation will become increasingly importantly when dask is enabled and as scale out occurs so take a moment now to just poke around the interface. Depending on your area of interest set above, you should have a relatively small area (perhaps around 300 to 400 pixels in each of the x abd y dimensions) and perhaps up to 10 time slices. This is a relatively small scale and fine to do without using Dask.
dataset
<xarray.Dataset>
Dimensions: (time: 6, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2022-02-07T14:39:10.740819 ... 2022-04...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 41267 40757 40797 41270 ... 9488 9355 9121
blue (time, y, x) uint16 41290 40840 40834 41300 ... 10065 9930 9576
green (time, y, x) uint16 39954 39505 39326 ... 11160 11214 10860
red (time, y, x) uint16 39881 39380 39245 ... 12368 12408 11827
nir08 (time, y, x) uint16 38927 38419 38287 ... 13923 14054 13285
swir16 (time, y, x) uint16 28605 28092 27987 ... 16652 16490 15362
swir22 (time, y, x) uint16 21633 21250 21185 ... 15800 15671 14830
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 21824 21824 21824
qa_aerosol (time, y, x) uint8 204 220 224 208 221 224 ... 96 96 96 96 96
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refNext up filter out pixels which are affect by clouds and other issues and compute the NDVI. Since we aren't specifying a time range this will be performed for all images.
%%time
# Identify pixels that don't have cloud, cloud shadow or water
from datacube.utils import masking
good_pixel_flags = {
'nodata': False,
'cloud': 'not_high_confidence',
'cloud_shadow': 'not_high_confidence',
'water': 'land_or_cloud'
}
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI
ndvi = None
ndvi = band_diff / band_sum
CPU times: user 24.2 ms, sys: 12.1 ms, total: 36.3 ms Wall time: 35.9 ms
The result ndvi is an xarray.DataArray. Let's take a look at it. Again the notebook will render an html version of the data in summary form.
Notice again the actual data values are being shown and that there are the same number of time slices as above and the x and y dimensions are identical.
ndvi
<xarray.DataArray (time: 6, y: 381, x: 335)>
array([[[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
...,
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan]],
[[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
...
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan]],
[[ nan, nan, nan, ..., 0.06816315,
0.07301709, 0.07054072],
[ nan, nan, nan, ..., 0.06340865,
0.06887292, 0.06703169],
[ nan, nan, nan, ..., 0.06328423,
0.06745553, 0.0685059 ],
...,
[0.06371912, 0.09801642, 0.08978288, ..., 0.04389915,
0.0333082 , 0.03885442],
[0.07099895, 0.07649794, 0.06243465, ..., 0.04756806,
0.03867994, 0.03976882],
[0.04632761, 0.04720268, 0.04689552, ..., 0.05914572,
0.0622024 , 0.05805989]]])
Coordinates:
* time (time) datetime64[ns] 2022-02-07T14:39:10.740819 ... 2022-04...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718Raw numbers aren't nice to look at so let's draw a time slice. We'll select just one of them to draw and pick one that didn't get masked out by cloud completely. You can see that all clouds and water has been masked out so that we are just looking at the NDVI of the land area.
ndvi.isel(time=1).plot()
<matplotlib.collections.QuadMesh at 0x7f49f481afe0>
Let's set our time range to a couple of weeks, or approximately two passes of Landsat 8 for this ROI. Less data will allow us to explore how dask works with the datacube and xarray libraries.
set_time = ("2021-01-01", "2021-01-14")
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
group_by=group_by,
)
dataset
CPU times: user 359 ms, sys: 769 ms, total: 1.13 s Wall time: 4.36 s
<xarray.Dataset>
Dimensions: (time: 1, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 38342 38311 37994 ... 43075 43200 43245
blue (time, y, x) uint16 38117 38116 37811 ... 43014 43109 43154
green (time, y, x) uint16 36410 36469 36214 ... 41419 41545 41591
red (time, y, x) uint16 36244 36332 36158 ... 41362 41495 41545
nir08 (time, y, x) uint16 35466 35587 35444 ... 40546 40676 40735
swir16 (time, y, x) uint16 28398 28591 28442 ... 28314 28429 28429
swir22 (time, y, x) uint16 22729 22865 22740 ... 21030 21133 21130
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 22280 22280 22280
qa_aerosol (time, y, x) uint8 224 206 220 224 210 ... 224 224 224 224 224
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refAs before you can see the actual data in the results but this time there should only be 1 or 2 observation times
Now let's create a LocalCluster as we did in the earlier notebook.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: Just now | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:46441 | Total threads: 2 |
| Dashboard: http://127.0.0.1:46687/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-oe2jypeh | |
| Comm: tcp://127.0.0.1:41515 | Total threads: 2 |
| Dashboard: http://127.0.0.1:39339/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-qs4kvx5w | |
| Comm: tcp://127.0.0.1:38099 | Total threads: 2 |
| Dashboard: http://127.0.0.1:41629/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-gp8eu0fp | |
| Comm: tcp://127.0.0.1:43711 | Total threads: 2 |
| Dashboard: http://127.0.0.1:42069/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-bsblm3vv | |
You may like to open up the dashboard for the cluster, although for this notebook we won't be talking about the dashboard (that's for a later discussion).
notebook_utils.localcluster_dashboard(client=client,server=easi.hub)
'https://hub.datacubechile.cl/user/jhodge/proxy/8787/status'
Now that we are using a cluster, even though it is local, we need to make sure that our cluster has the right configuration to use Requester Pays buckets in AWS S3. To do this, we need to re-run the configure_s3_access() function that we ran earlier, but we need to pass the client to the function as well.
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
datacube.load() will use the default dask cluster (the one we just created) if the dask_chunks parameter is specified.
The chunk shape and memory size is a critial parameter in tuning dask and we will be discussing it in great detail as scale increases. For now we're simply going to specify that the time dimension should individually chunked (1 slice of time) and by not specifying any chunking for the other dimensions they will be form a single contiguous block.
If that made no sense what's so ever, that's fine because we will look at an example.
chunks = {"time":1}
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks, ###### THIS IS THE ONLY LINE CHANGED. #####
group_by=group_by,
)
dataset
CPU times: user 28.3 ms, sys: 99 µs, total: 28.4 ms Wall time: 35.5 ms
<xarray.Dataset>
Dimensions: (time: 1, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
blue (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
green (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
red (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
nir08 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir16 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir22 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_pixel (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_aerosol (time, y, x) uint8 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_radsat (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
Attributes:
crs: epsg:32718
grid_mapping: spatial_refFirst thing you probably noticed is that whilst only one line changed the load time dropped to sub-seconds!
The second thing you probably noticed is if you look at one of the data variables by clicking on the database icon as before, there is no data but instead there is a diagram which shows you the Dask Chunks for each measurement. It's really fast because it didn't actually load any data!
When datatcube has dask_chunks specified it switches from creating xarrays to instead use dask.arrays in the backend and lazy loads them - this means that no data is loaded until used. If you look at one of the data variables you will see it now has dask.array<chunksize=(....)> rather than values and the cylinder icon will show the Array and Chunk parameters along with some statistics, not actual data.
The datacube.load() has used the dask.Delayed interface which will not perform any tasks (Dask's name for calculations) until the result of the task is actually required. We'll load the data in a moment but first let's take a look at the parameters in that pretty visualisation. Click on the cylinder for the red Data variables and look at the table and the figure. It should look similar to the image below.
Looking at this image (yours may be different), you can see that:
221.92 kiB in total size and is broken into Chunks which have size 110.96 kiB(2, 375, 303) (time, y, x) but each chunk is (1,375,303) because we specified the time dimension should have chunks of length 1.2 chunk tasks, one for each time slice and in this instance, only one graph layer. More complex calculations will have more layers in the graph.uint16 and is split up into chunks which are numpy.ndarrays.The chunking has split the array loading into two Chunks. Dask can execute these in parallel.
We can look at the delayed tasks and how they will be executed by visualising the task graph for one of the variables. We'll use the red band measurement.
dataset.red.data.visualize()
Details on the task graph can be found in the dask user guide but what's clear is you have two independent paths of execution which produce one time slice each (0,0,0) and (1,0,0) these are the two chunks that that full array has been split into.
To retrieve the actual data we need to compute() the result, this will cause all the delayed tasks to be executed for the variable we are computing. Let's compute() the red variable.
%%time
actual_red = dataset.red.compute()
actual_red
CPU times: user 140 ms, sys: 37.8 ms, total: 178 ms Wall time: 1.38 s
<xarray.DataArray 'red' (time: 1, y: 381, x: 335)>
array([[[36244, 36332, 36158, ..., 42967, 42709, 42677],
[35998, 35979, 35849, ..., 42915, 42607, 42484],
[35682, 35681, 35549, ..., 42836, 42604, 42447],
...,
[37428, 37620, 37992, ..., 41429, 41547, 41576],
[37429, 37471, 37771, ..., 41378, 41484, 41530],
[37592, 37433, 37576, ..., 41362, 41495, 41545]]], dtype=uint16)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Attributes:
units: reflectance
nodata: 0
crs: epsg:32718
grid_mapping: spatial_refAs you can see we now have actual data (there are real numbers, not just Dask arrays). You can do the same thing for all arrays in the dataset in one go by computing the dataset itself.
%%time
actual_dataset = dataset.compute()
actual_dataset
CPU times: user 81.7 ms, sys: 614 µs, total: 82.3 ms Wall time: 1.33 s
<xarray.Dataset>
Dimensions: (time: 1, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 38342 38311 37994 ... 43075 43200 43245
blue (time, y, x) uint16 38117 38116 37811 ... 43014 43109 43154
green (time, y, x) uint16 36410 36469 36214 ... 41419 41545 41591
red (time, y, x) uint16 36244 36332 36158 ... 41362 41495 41545
nir08 (time, y, x) uint16 35466 35587 35444 ... 40546 40676 40735
swir16 (time, y, x) uint16 28398 28591 28442 ... 28314 28429 28429
swir22 (time, y, x) uint16 22729 22865 22740 ... 21030 21133 21130
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 22280 22280 22280
qa_aerosol (time, y, x) uint8 224 206 220 224 210 ... 224 224 224 224 224
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refFrom the above we can see that specifying dask_chunks in datacube.load() splits up the load() operation into a set of chunk shaped arrays and delayed tasks. Dask can now perform those tasks in parallel. Dask will only compute the results for those parts of the data we are using but we can force the computation of all the delayed tasks using compute().
There is a lot more opportunity than described in this simple example but let's just focus on the impact of dask on ODC for this simple case.
The time period and ROI are far too small to be interesting so let's change our time range to a full year of data.
set_time = ("2021-01-01", "2021-12-31")
First load the data without dask (no dask_chunks specified).
NOTE that this will take several minutes so be patient
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
group_by=group_by,
)
dataset
CPU times: user 9.65 s, sys: 17.5 s, total: 27.1 s Wall time: 1min 26s
<xarray.Dataset>
Dimensions: (time: 22, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 38342 38311 37994 37453 ... 9836 9832 9814
blue (time, y, x) uint16 38117 38116 37811 ... 10336 10392 10318
green (time, y, x) uint16 36410 36469 36214 ... 11842 11640 11518
red (time, y, x) uint16 36244 36332 36158 ... 13181 12926 12585
nir08 (time, y, x) uint16 35466 35587 35444 ... 14604 14630 14254
swir16 (time, y, x) uint16 28398 28591 28442 ... 17902 17175 16479
swir22 (time, y, x) uint16 22729 22865 22740 ... 16694 16075 15613
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 21824 21824 21824
qa_aerosol (time, y, x) uint8 224 206 220 224 210 ... 166 160 106 96 96
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refThere should now be around 40 or more time observations and in the order of 3-5 minutes to load.
Let's enable dask and repeat the load. We're chunking by time (length one) so dask will be able to load each time slice in parallel. The data variables are also independent so will be done in parallel as well.
chunks = {"time":1}
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks, ###### THIS IS THE ONLY LINE CHANGED. #####
group_by=group_by,
)
dataset
CPU times: user 66.6 ms, sys: 6.84 ms, total: 73.4 ms Wall time: 107 ms
<xarray.Dataset>
Dimensions: (time: 22, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
blue (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
green (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
red (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
nir08 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir16 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir22 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_pixel (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_aerosol (time, y, x) uint8 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_radsat (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
Attributes:
crs: epsg:32718
grid_mapping: spatial_refWoah!! that was fast - but we didn't actually compute anything so no load has occurred and all tasks are pending. Open up the Data Variables, click the stacked cylinders and take a look at the delayed task counts. These exist for every variable.
Let's visualise the task graph for the red band.
dataset.red.data.visualize()
Well that's not as useful, is it!
You should just be able to make out that each of the chunks are able to independently load(). time chunk is length 1 so these are individual times. This holds true for all the bands so dask can spread these out across multiple threads.
Tip: Visualising task graphs is less effective as your task graph complexity increases. You may need to use simpler examples to see what is going on.
Let's get the actual data
%%time
actual_dataset = dataset.compute()
actual_dataset
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject( /env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject(
CPU times: user 470 ms, sys: 162 ms, total: 632 ms Wall time: 7.69 s
<xarray.Dataset>
Dimensions: (time: 22, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 38342 38311 37994 37453 ... 9836 9832 9814
blue (time, y, x) uint16 38117 38116 37811 ... 10336 10392 10318
green (time, y, x) uint16 36410 36469 36214 ... 11842 11640 11518
red (time, y, x) uint16 36244 36332 36158 ... 13181 12926 12585
nir08 (time, y, x) uint16 35466 35587 35444 ... 14604 14630 14254
swir16 (time, y, x) uint16 28398 28591 28442 ... 17902 17175 16479
swir22 (time, y, x) uint16 22729 22865 22740 ... 16694 16075 15613
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 21824 21824 21824
qa_aerosol (time, y, x) uint8 224 206 220 224 210 ... 166 160 106 96 96
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refHow fast this step is will depend on how many cores are in your Jupyter notebook's local cluster. In real world scenarios, an 8-core cluster the datacube.load() this may take between 1/4 or 1/6 of the time compared to without dask depending on many factors. This is great!
Why not 1/8 of the time?
Dask has overheads, and datacube.load() itself is IO limited. There are all sorts of things that result in limits and part of the art of parallel computing is tuning your algorithm to reduce the impact of these and achieve greater performnance. As we scale up this example we'll explore some of these.
Tip: recent updates to Dask have greatly improved performance and we are now seeing more substantial performance gains, more in line with the increase in cores.
Do not always expect 8x as many cores to produce 8x the speed up. Algorithms can be tuned to perform better (or worse) as scale increases. This is part of the art of parallel programming. Dask does it's best, and you can often do better.
Now let's repeat the full example, with NDVI calculation and masking, but this time with dask and compute to load the data in.
First the dc.load()...
chunks = {"time":1}
This time, we will run the .compute() step straight away, resulting in real numbers being returned from Dask.
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
actual_dataset = dataset.compute()
CPU times: user 499 ms, sys: 81.9 ms, total: 581 ms Wall time: 3.95 s
actual_dataset
<xarray.Dataset>
Dimensions: (time: 22, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 38342 38311 37994 37453 ... 9836 9832 9814
blue (time, y, x) uint16 38117 38116 37811 ... 10336 10392 10318
green (time, y, x) uint16 36410 36469 36214 ... 11842 11640 11518
red (time, y, x) uint16 36244 36332 36158 ... 13181 12926 12585
nir08 (time, y, x) uint16 35466 35587 35444 ... 14604 14630 14254
swir16 (time, y, x) uint16 28398 28591 28442 ... 17902 17175 16479
swir22 (time, y, x) uint16 22729 22865 22740 ... 16694 16075 15613
qa_pixel (time, y, x) uint16 22280 22280 22280 ... 21824 21824 21824
qa_aerosol (time, y, x) uint8 224 206 220 224 210 ... 166 160 106 96 96
qa_radsat (time, y, x) uint16 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0
Attributes:
crs: epsg:32718
grid_mapping: spatial_refNow use the actual_result to compute the NDVI for all observation times
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(actual_dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = actual_dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
This completed very quickly because most of the time is in the data load, the actual calculation is < 1 second.
Now let's repeat that entire load and NDVI calculation in a single cell and time it - this is just to get the total time for later comparison.
To ensure comparable timings, we will .restart() the Dask cluster. This makes sure that we aren't just seeing performance gains for data caching.
Note that this will show some
Restarting workerwarnings. That is ok and it is just telling you that each of the four workers in the cluster are restarting.
client.restart()
2023-05-24 15:45:33,401 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:33,402 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:33,418 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:33,420 - distributed.nanny - WARNING - Restarting worker
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: 1 minute ago | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:46273 | Total threads: 2 |
| Dashboard: http://127.0.0.1:34417/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-1djz1521 | |
| Comm: tcp://127.0.0.1:33933 | Total threads: 2 |
| Dashboard: http://127.0.0.1:43363/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-qjo9izxw | |
| Comm: tcp://127.0.0.1:45613 | Total threads: 2 |
| Dashboard: http://127.0.0.1:39845/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-6dn9lszu | |
| Comm: tcp://127.0.0.1:40649 | Total threads: 2 |
| Dashboard: http://127.0.0.1:34351/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-3gief1m6 | |
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
actual_dataset = dataset.compute() ### Compute the dataset ###
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(actual_dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = actual_dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject( /env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject(
CPU times: user 638 ms, sys: 160 ms, total: 798 ms Wall time: 6.09 s
around 10 seconds (for an 8-core cluster) or so. We can do better...
When compute() is called dask not only executes all the tasks but it consolidates all the distributed chunks back into a normal array on the client machine - in this case the notebook's kernel. In the previous cell we have two variables that both refer to the data we are loading:
delayed version of the data. The delayed tasks and the chunks that make it up will be on the clusterSo in the previous cell everything after the actual_dataset = dataset.compute() line is computed in the Jupyter kernel and doesn't use the dask cluster at all for computation.
If we shift the location of this compute() call we can perform more tasks in parallel on the dask cluster.
Tip: Locality is an important concept and applies to both data and computation
Now let's repeat the load and NDVI calculation but this time rather than compute() on the full dataset we'll run the compute at the cloud masking step (cloud_free = dataset.where(cloud__free_mask).compute()) so the masking operation can be performed in parallel. Let's see what the impact is...
client.restart()
2023-05-24 15:45:40,716 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:40,722 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:40,723 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:40,729 - distributed.nanny - WARNING - Restarting worker
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: 1 minute ago | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:41495 | Total threads: 2 |
| Dashboard: http://127.0.0.1:34263/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-e9gax7ql | |
| Comm: tcp://127.0.0.1:36847 | Total threads: 2 |
| Dashboard: http://127.0.0.1:36887/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-e415nr2v | |
| Comm: tcp://127.0.0.1:44629 | Total threads: 2 |
| Dashboard: http://127.0.0.1:41565/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-t13s2sz_ | |
| Comm: tcp://127.0.0.1:45789 | Total threads: 2 |
| Dashboard: http://127.0.0.1:43339/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-3u38bodn | |
%%time
dataset = None # clear results from any previous runs
del dataset
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask).compute() ### COMPUTE MOVED HERE ###
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi
CPU times: user 865 ms, sys: 213 ms, total: 1.08 s Wall time: 5.1 s
Not that different, but still a second or so quicker. This isn't too surprising since the masking operation is pretty quick (it's all numpy) and the data load is the bulk of the processing.
Dask can see the entire task graph for both load and mask computation. As a result some of the computation can be performed concurrently with file IO, and CPUs are busier as a result, so it will be slightly faster in practice but with IO dominating we won't see much overall improvement.
Perhaps doing more of the calculation on the cluster will help. Let's also move ndvi.compute() so the entire calculation is done on the cluster and only the final result returned to the client.
client.restart()
2023-05-24 15:45:47,038 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:47,039 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:47,046 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:47,047 - distributed.nanny - WARNING - Restarting worker
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: 1 minute ago | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:36091 | Total threads: 2 |
| Dashboard: http://127.0.0.1:43037/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-hpk4383n | |
| Comm: tcp://127.0.0.1:46441 | Total threads: 2 |
| Dashboard: http://127.0.0.1:43027/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-5cu3wt7w | |
| Comm: tcp://127.0.0.1:36043 | Total threads: 2 |
| Dashboard: http://127.0.0.1:40669/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-r1lyx1pa | |
| Comm: tcp://127.0.0.1:35425 | Total threads: 2 |
| Dashboard: http://127.0.0.1:36971/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-om96vlj5 | |
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi.compute() ### COMPUTE MOVED HERE ###
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject(
CPU times: user 358 ms, sys: 50.1 ms, total: 409 ms Wall time: 2.43 s
Now we are seeing a huge difference!
You may be thinking "Hold on a sec, the NDVI calculation is pretty quick in this example with such a small dataset, why such a big difference?" - and you'd be right. There is more going on.
Remember that dataset is a task graph with delayed tasks waiting to be executed when the result is required. In the example dataset, there are many data variables are available but only 3 are used to produce the ndvi (qa_pixel, red and nir08). As a result dask doesn't load the other variables and because computation time in this case is mostly IO related the execution time is a LOT faster.
Of course we can save dask the trouble of figuring this out on our behalf and only load() the measurements we need in the first place. Let's check that now, we should see a similar performance figure.
client.restart()
2023-05-24 15:45:50,702 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:50,708 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:50,728 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:50,756 - distributed.nanny - WARNING - Restarting worker
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: 2 minutes ago | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:38275 | Total threads: 2 |
| Dashboard: http://127.0.0.1:34859/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-bo12vhn8 | |
| Comm: tcp://127.0.0.1:35107 | Total threads: 2 |
| Dashboard: http://127.0.0.1:43327/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-vjp_63fx | |
| Comm: tcp://127.0.0.1:38683 | Total threads: 2 |
| Dashboard: http://127.0.0.1:36619/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-03g44p_j | |
| Comm: tcp://127.0.0.1:41669 | Total threads: 2 |
| Dashboard: http://127.0.0.1:39273/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-yl1wwgv8 | |
%%time
dataset = None # clear results from any previous runs
measurements = [ "qa_pixel", "red", "nir08"]
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi.compute()
CPU times: user 302 ms, sys: 62.7 ms, total: 364 ms Wall time: 2.44 s
Pretty similar as expected, but again, a slight improvement because now there are less overheads and a smaller task graph.
Now it can pay to give dask a hand and not have the task graph cluttered with tasks you are not going to use. Still it's nice to see that dask can save you some time by only computing what is required when you need it.
For completeness we will take a look at the task graph for the full calculation, all the way to the NDVI result. Given the complexity of the full graph we'll simplify it to 2 time observations like we did when the task graph was introduced previously.
set_time = ("2021-01-01", "2021-01-14")
client.restart()
2023-05-24 15:45:54,469 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:54,470 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:54,477 - distributed.nanny - WARNING - Restarting worker 2023-05-24 15:45:54,478 - distributed.nanny - WARNING - Restarting worker
Client-c74f3414-fa49-11ed-99da-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
d46c9864
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-04088dc5-463b-4f93-8651-13df22de11c5
| Comm: tcp://127.0.0.1:36203 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: 2 minutes ago | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:35031 | Total threads: 2 |
| Dashboard: http://127.0.0.1:44305/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:46815 | |
| Local directory: /tmp/dask-worker-space/worker-oqcpdg27 | |
| Comm: tcp://127.0.0.1:34785 | Total threads: 2 |
| Dashboard: http://127.0.0.1:33925/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:38017 | |
| Local directory: /tmp/dask-worker-space/worker-7q9snt43 | |
| Comm: tcp://127.0.0.1:36445 | Total threads: 2 |
| Dashboard: http://127.0.0.1:46473/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:41837 | |
| Local directory: /tmp/dask-worker-space/worker-j8860mz9 | |
| Comm: tcp://127.0.0.1:34291 | Total threads: 2 |
| Dashboard: http://127.0.0.1:38719/status | Memory: 7.25 GiB |
| Nanny: tcp://127.0.0.1:36811 | |
| Local directory: /tmp/dask-worker-space/worker-idz3fkuc | |
%%time
dataset = None # clear results from any previous runs
measurements = [ "qa_pixel", "red", "nir08"]
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
CPU times: user 40.7 ms, sys: 8.63 ms, total: 49.4 ms Wall time: 57 ms
ndvi.data.visualize()
The computation flows from bottom to top in the task graph. You can see there are two main paths, one for each time (since the time chunk is length 1). You can also see the three data sources are loaded independently. After that it gets a little more difficult to follow but you can see qa_pixel being used to produce the mask (and_, eq). Then combined via the where function with other two datasets. Then finally the NDVI calculation - a sub, add and divide (truediv).
Dask has lots of internal optimizations that it uses to help identify the dependencies and parallel components of a task graph. Sometimes it will reorder or prune operations where possible to further optimise (for example, not loading data variables that aren't used in the NDVI calculation).
Tip: The task graph can be complex but it is a useful tool in understanding your algorithm and how it scales.
client.close()
cluster.close()